package com.om.opensourway;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
import org.apache.storm.topology.TopologyBuilder;

import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.util.Properties;

/**
 * 写入数据到 Kafka 中
 */
public class AnalysisForStreaming {

    static Properties conf = new Properties();

    public static void main(String[] args) throws URISyntaxException, IOException, InvalidTopologyException, AuthorizationException, AlreadyAliveException {


        TopologyBuilder builder = new TopologyBuilder();
        //reading resource from jar
        InputStream resourceAsStream = AnalysisForStreaming.class.getResourceAsStream("/conf.properties");
        conf.load(resourceAsStream);


        builder.setSpout(conf.getProperty("storm.SpoutName"), new KafkaSpout<>(getKafkaSpoutConfig(conf.getProperty("kafka.bootstrap.serverswithport"), conf.getProperty("kafka.topic"))), 1);
        builder.setBolt(conf.getProperty("storm.BoltName"), new giteeBolt()).shuffleGrouping(conf.getProperty("storm.SpoutName"));


        Config map = new Config();
        map.put("max.poll.records", 8);
        map.put("max.poll.interval.ms", "30000");
        map.put("session.timeout.ms", "30000");
        StormSubmitter.submitTopology(conf.getProperty("strom.TopologyName"),
                map, builder.createTopology());
    }


    private static KafkaSpoutConfig<String, String> getKafkaSpoutConfig(String bootstrapServers, String topic) {
        return KafkaSpoutConfig.builder(bootstrapServers, topic)
                // 除了分组 ID,以下配置都是可选的。分组 ID 必须指定,否则会抛出 InvalidGroupIdException 异常
                .setProp(ConsumerConfig.GROUP_ID_CONFIG, conf.getProperty("kafka.group.id"))
                .setProp(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,conf.getProperty("session.timeout.ms"))
                .setProp(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,conf.getProperty("max.poll.interval.ms"))
                .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,conf.getProperty("max.poll.records"))
                .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName())
                .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName())
                .setProp(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest")
                // 定义重试策略
                .setRetry(getRetryService())
                // 定时提交偏移量的时间间隔,默认是 15s
                .setOffsetCommitPeriodMs(10_000)
                .build();
    }

    // 定义重试策略
    private static KafkaSpoutRetryService getRetryService() {
        return new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(500),
                KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2), Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10));
    }
}
